
<!DOCTYPE HTML>
<html lang="zh-hans" >
    <head>
        <meta charset="UTF-8">
        <meta content="text/html; charset=utf-8" http-equiv="Content-Type">
        <title>6.2 Kafka Stream 数据清洗案例 · 大数据 Fulume 课堂笔记</title>
        <meta http-equiv="X-UA-Compatible" content="IE=edge" />
        <meta name="description" content="">
        <meta name="generator" content="GitBook 3.2.3">
        <meta name="author" content=" 李振超">
        
        
    
    <link rel="stylesheet" href="../gitbook/style.css">

    
            
                
                <link rel="stylesheet" href="../gitbook/gitbook-plugin-splitter/splitter.css">
                
            
                
                <link rel="stylesheet" href="../gitbook/gitbook-plugin-expandable-chapters-small/expandable-chapters-small.css">
                
            
                
                <link rel="stylesheet" href="../gitbook/gitbook-plugin-page-toc-button/plugin.css">
                
            
                
                <link rel="stylesheet" href="../gitbook/gitbook-plugin-prism/prism-solarizedlight.css">
                
            
                
                <link rel="stylesheet" href="../gitbook/gitbook-plugin-tbfed-pagefooter/footer.css">
                
            
                
                <link rel="stylesheet" href="../gitbook/gitbook-plugin-change_girls/girls.css">
                
            
                
                <link rel="stylesheet" href="../gitbook/gitbook-plugin-search/search.css">
                
            
                
                <link rel="stylesheet" href="../gitbook/gitbook-plugin-fontsettings/website.css">
                
            
        

    

    
        
        <link rel="stylesheet" href="../styles/website.css">
        
    
        
    
        
    
        
    
        
    

        
    
    
    <meta name="HandheldFriendly" content="true"/>
    <meta name="viewport" content="width=device-width, initial-scale=1, user-scalable=no">
    <meta name="apple-mobile-web-app-capable" content="yes">
    <meta name="apple-mobile-web-app-status-bar-style" content="black">
    <link rel="apple-touch-icon-precomposed" sizes="152x152" href="../gitbook/images/apple-touch-icon-precomposed-152.png">
    <link rel="shortcut icon" href="../gitbook/images/favicon.ico" type="image/x-icon">

    
    <link rel="next" href="../di-7-zhang-kuo-zhan.html" />
    
    
    <link rel="prev" href="61-gai-shu.html" />
    

    </head>
    <body>
        
<div class="book">
    <div class="book-summary">
        
            
<div id="book-search-input" role="search">
    <input type="text" placeholder="输入并搜索" />
</div>

            
                <nav role="navigation">
                


<ul class="summary">
    
    
    
        
        <li>
            <a href="http://www.atguigu.com/" target="_blank" class="custom-link">联系我们</a>
        </li>
    
    

    
    <li class="divider"></li>
    

    
        
        
    
        <li class="chapter " data-level="1.1" data-path="../">
            
                <a href="../">
            
                    
                    第 1 章 Kafka 概述
            
                </a>
            

            
            <ul class="articles">
                
    
        <li class="chapter " data-level="1.1.1" data-path="../11-xiao-xi-dui-lie.html">
            
                <a href="../11-xiao-xi-dui-lie.html">
            
                    
                    1.1 消息队列
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.1.2" data-path="../12-wei-shi-yao-xu-yao-xiao-xi-dui-lie.html">
            
                <a href="../12-wei-shi-yao-xu-yao-xiao-xi-dui-lie.html">
            
                    
                    1.2 为什么需要消息队列
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.1.3" data-path="../13-shi-yao-shi-kafka.html">
            
                <a href="../13-shi-yao-shi-kafka.html">
            
                    
                    1.3 什么是 Kafka
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.1.4" data-path="../14-kafka-jia-gou.html">
            
                <a href="../14-kafka-jia-gou.html">
            
                    
                    1.4 Kafka 架构
            
                </a>
            

            
        </li>
    

            </ul>
            
        </li>
    
        <li class="chapter " data-level="1.2" data-path="../chapter1.html">
            
                <a href="../chapter1.html">
            
                    
                    第 2 章 Kafka 集群部署
            
                </a>
            

            
            <ul class="articles">
                
    
        <li class="chapter " data-level="1.2.1" data-path="../chapter1/21-xia-zai-kafka.html">
            
                <a href="../chapter1/21-xia-zai-kafka.html">
            
                    
                    2.1 下载 Kafka
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.2.2" data-path="../chapter1/22-bu-shu.html">
            
                <a href="../chapter1/22-bu-shu.html">
            
                    
                    2.2 部署
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.2.3" data-path="../chapter1/23-kafka-chang-yong-ming-ling-cao-zuo.html">
            
                <a href="../chapter1/23-kafka-chang-yong-ming-ling-cao-zuo.html">
            
                    
                    2.3 Kafka 常用命令操作
            
                </a>
            

            
        </li>
    

            </ul>
            
        </li>
    
        <li class="chapter " data-level="1.3" data-path="../di-3-zhang-kafka-gong-zuo-liu-cheng-fen-xi.html">
            
                <a href="../di-3-zhang-kafka-gong-zuo-liu-cheng-fen-xi.html">
            
                    
                    第 3 章 Kafka 工作流程分析
            
                </a>
            

            
            <ul class="articles">
                
    
        <li class="chapter " data-level="1.3.1" data-path="../di-3-zhang-kafka-gong-zuo-liu-cheng-fen-xi/31-kafka-sheng-cheng-guo-cheng-fen-xi.html">
            
                <a href="../di-3-zhang-kafka-gong-zuo-liu-cheng-fen-xi/31-kafka-sheng-cheng-guo-cheng-fen-xi.html">
            
                    
                    3.1 Kafka 生产过程分析
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.3.2" data-path="../di-3-zhang-kafka-gong-zuo-liu-cheng-fen-xi/32-broker-bao-cun-xiao-xi.html">
            
                <a href="../di-3-zhang-kafka-gong-zuo-liu-cheng-fen-xi/32-broker-bao-cun-xiao-xi.html">
            
                    
                    3.2 broker 保存消息
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.3.3" data-path="../di-3-zhang-kafka-gong-zuo-liu-cheng-fen-xi/33-zookeeper-cun-chu-jie-gou.html">
            
                <a href="../di-3-zhang-kafka-gong-zuo-liu-cheng-fen-xi/33-zookeeper-cun-chu-jie-gou.html">
            
                    
                    3.3 Zookeeper 存储结构
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.3.4" data-path="../di-3-zhang-kafka-gong-zuo-liu-cheng-fen-xi/34-kafka-xiao-fei-guo-cheng-fen-xi.html">
            
                <a href="../di-3-zhang-kafka-gong-zuo-liu-cheng-fen-xi/34-kafka-xiao-fei-guo-cheng-fen-xi.html">
            
                    
                    3.4 Kafka 消费过程分析
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.3.5" data-path="../di-3-zhang-kafka-gong-zuo-liu-cheng-fen-xi/35-xiao-fei-zhe-zu-an-li.html">
            
                <a href="../di-3-zhang-kafka-gong-zuo-liu-cheng-fen-xi/35-xiao-fei-zhe-zu-an-li.html">
            
                    
                    3.5 消费者组案例
            
                </a>
            

            
        </li>
    

            </ul>
            
        </li>
    
        <li class="chapter " data-level="1.4" data-path="../di-4-zhang-api-shi-zhan.html">
            
                <a href="../di-4-zhang-api-shi-zhan.html">
            
                    
                    第 4 章 API 实战
            
                </a>
            

            
            <ul class="articles">
                
    
        <li class="chapter " data-level="1.4.1" data-path="../di-4-zhang-api-shi-zhan/41-huan-jing-zhun-bei.html">
            
                <a href="../di-4-zhang-api-shi-zhan/41-huan-jing-zhun-bei.html">
            
                    
                    4.1 环境准备
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.4.2" data-path="../di-4-zhang-api-shi-zhan/42-kafka-sheng-chan-zhe-api.html">
            
                <a href="../di-4-zhang-api-shi-zhan/42-kafka-sheng-chan-zhe-api.html">
            
                    
                    4.2 Kafka 生产者 API
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.4.3" data-path="../di-4-zhang-api-shi-zhan/43-kafka-xiao-fei-zhe-api.html">
            
                <a href="../di-4-zhang-api-shi-zhan/43-kafka-xiao-fei-zhe-api.html">
            
                    
                    4.3 Kafka 消费者 API
            
                </a>
            

            
            <ul class="articles">
                
    
        <li class="chapter " data-level="1.4.3.1" data-path="../di-4-zhang-api-shi-zhan/43-kafka-xiao-fei-zhe-api/431-kafka-xiao-fei-zhe-zhi-gao-ji-api.html">
            
                <a href="../di-4-zhang-api-shi-zhan/43-kafka-xiao-fei-zhe-api/431-kafka-xiao-fei-zhe-zhi-gao-ji-api.html">
            
                    
                    4.3.1 Kafka 消费者之高级 API
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.4.3.2" data-path="../di-4-zhang-api-shi-zhan/43-kafka-xiao-fei-zhe-api/432-kafka-xiao-fei-zhe-zhi-di-ji-api.html">
            
                <a href="../di-4-zhang-api-shi-zhan/43-kafka-xiao-fei-zhe-api/432-kafka-xiao-fei-zhe-zhi-di-ji-api.html">
            
                    
                    4.3.2 Kafka 消费者之低级 API
            
                </a>
            

            
        </li>
    

            </ul>
            
        </li>
    

            </ul>
            
        </li>
    
        <li class="chapter " data-level="1.5" data-path="../di-5-zhang-lan-jie-qi.html">
            
                <a href="../di-5-zhang-lan-jie-qi.html">
            
                    
                    第 5 章 拦截器
            
                </a>
            

            
            <ul class="articles">
                
    
        <li class="chapter " data-level="1.5.1" data-path="../51-lan-jie-qi-yuan-li.html">
            
                <a href="../51-lan-jie-qi-yuan-li.html">
            
                    
                    5.1 拦截器原理
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.5.2" data-path="../52-lan-jie-qi-an-li.html">
            
                <a href="../52-lan-jie-qi-an-li.html">
            
                    
                    5.2 拦截器案例
            
                </a>
            

            
        </li>
    

            </ul>
            
        </li>
    
        <li class="chapter " data-level="1.6" data-path="../di-6-zhang-kafka-stream.html">
            
                <a href="../di-6-zhang-kafka-stream.html">
            
                    
                    第 6 章 Kafka stream
            
                </a>
            

            
            <ul class="articles">
                
    
        <li class="chapter " data-level="1.6.1" data-path="61-gai-shu.html">
            
                <a href="61-gai-shu.html">
            
                    
                    6.1 概述
            
                </a>
            

            
        </li>
    
        <li class="chapter active" data-level="1.6.2" data-path="62-kafka-stream-shu-ju-qing-xi-an-li.html">
            
                <a href="62-kafka-stream-shu-ju-qing-xi-an-li.html">
            
                    
                    6.2 Kafka Stream 数据清洗案例
            
                </a>
            

            
        </li>
    

            </ul>
            
        </li>
    
        <li class="chapter " data-level="1.7" data-path="../di-7-zhang-kuo-zhan.html">
            
                <a href="../di-7-zhang-kuo-zhan.html">
            
                    
                    第 7 章 扩展
            
                </a>
            

            
            <ul class="articles">
                
    
        <li class="chapter " data-level="1.7.1" data-path="../di-7-zhang-kuo-zhan/71-kafka-he-flume-ji-cheng.html">
            
                <a href="../di-7-zhang-kuo-zhan/71-kafka-he-flume-ji-cheng.html">
            
                    
                    7.1 Kafka 和 Flume 比较
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.7.2" data-path="../di-7-zhang-kuo-zhan/72-kafka-he-flume-ji-cheng.html">
            
                <a href="../di-7-zhang-kuo-zhan/72-kafka-he-flume-ji-cheng.html">
            
                    
                    7.2 Kafka 和 Flume 集成
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.7.3" data-path="../di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi.html">
            
                <a href="../di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi.html">
            
                    
                    7.3 Kafka 配置信息
            
                </a>
            

            
            <ul class="articles">
                
    
        <li class="chapter " data-level="1.7.3.1" data-path="../di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi/731-broker-pei-zhi-xin-xi.html">
            
                <a href="../di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi/731-broker-pei-zhi-xin-xi.html">
            
                    
                    7.3.1 broker 配置信息
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.7.3.2" data-path="../di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi/732-producer-pei-zhi-xin-xi.html">
            
                <a href="../di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi/732-producer-pei-zhi-xin-xi.html">
            
                    
                    7.3.2 Producer 配置信息
            
                </a>
            

            
        </li>
    
        <li class="chapter " data-level="1.7.3.3" data-path="../di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi/733-consumer-pei-zhi-xin-xi.html">
            
                <a href="../di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi/733-consumer-pei-zhi-xin-xi.html">
            
                    
                    7.3.3 Consumer 配置信息
            
                </a>
            

            
        </li>
    

            </ul>
            
        </li>
    

            </ul>
            
        </li>
    

    

    <li class="divider"></li>

    <li>
        <a href="https://www.gitbook.com" target="blank" class="gitbook-link">
            本书使用 GitBook 发布
        </a>
    </li>
</ul>


                </nav>
            
        
    </div>

    <div class="book-body">
        
            <div class="body-inner">
                
                    

<div class="book-header" role="navigation">
    

    <!-- Title -->
    <h1>
        <i class="fa fa-circle-o-notch fa-spin"></i>
        <a href=".." >6.2 Kafka Stream 数据清洗案例</a>
    </h1>
</div>




                    <div class="page-wrapper" tabindex="-1" role="main">
                        <div class="page-inner">
                            
<div id="book-search-results">
    <div class="search-noresults">
    
                                <section class="normal markdown-section">
                                
                                <h1 id="62-kafka-stream-&#x6570;&#x636E;&#x6E05;&#x6D17;">6.2 Kafka Stream &#x6570;&#x636E;&#x6E05;&#x6D17;</h1>
<h2 id="&#x9700;&#x6C42;">&#x9700;&#x6C42;</h2>
<p>&#x5B9E;&#x65F6;&#x5904;&#x7406;&#x5355;&#x8BCD;&#x5E26;&#x6709;<code>&gt;&gt;&gt;</code>&#x524D;&#x7F00;&#x7684;&#x5185;&#x5BB9;&#x3002;&#x4F8B;&#x5982;&#x8F93;&#x5165; <code>atguigu&gt;&gt;&gt;ximenqing</code>&#xFF0C;&#x6700;&#x7EC8;&#x5904;&#x7406;&#x6210; <code>ximenqing</code></p>
<hr>
<h2 id="&#x9700;&#x6C42;&#x5206;&#x6790;">&#x9700;&#x6C42;&#x5206;&#x6790;</h2>
<p><img src="http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/1541905676.png-atguiguText" alt=""></p>
<h2 id="&#x5B9E;&#x4F8B;&#x64CD;&#x4F5C;">&#x5B9E;&#x4F8B;&#x64CD;&#x4F5C;</h2>
<h3 id="1-&#x6DFB;&#x52A0;&#x4F9D;&#x8D56;">1. &#x6DFB;&#x52A0;&#x4F9D;&#x8D56;</h3>
<pre class="language-"><code class="lang-xml"><span class="token comment" spellcheck="true">&lt;!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams --&gt;</span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>dependency</span><span class="token punctuation">&gt;</span></span>
    <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>groupId</span><span class="token punctuation">&gt;</span></span>org.apache.kafka<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>groupId</span><span class="token punctuation">&gt;</span></span>
    <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>artifactId</span><span class="token punctuation">&gt;</span></span>kafka-streams<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>artifactId</span><span class="token punctuation">&gt;</span></span>
    <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>version</span><span class="token punctuation">&gt;</span></span>0.11.0.3<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>version</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>dependency</span><span class="token punctuation">&gt;</span></span>
</code></pre>
<h3 id="2-&#x4EE3;&#x7801;">2. &#x4EE3;&#x7801;</h3>
<h4 id="&#x4E3B;&#x7C7B;">&#x4E3B;&#x7C7B;</h4>
<pre class="language-"><code class="lang-java"><span class="token keyword">package</span> com<span class="token punctuation">.</span>atguigu<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>stream<span class="token punctuation">;</span>

<span class="token keyword">import</span> org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>KafkaStreams<span class="token punctuation">;</span>
<span class="token keyword">import</span> org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>StreamsConfig<span class="token punctuation">;</span>
<span class="token keyword">import</span> org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>processor<span class="token punctuation">.</span>Processor<span class="token punctuation">;</span>
<span class="token keyword">import</span> org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>processor<span class="token punctuation">.</span>ProcessorSupplier<span class="token punctuation">;</span>
<span class="token keyword">import</span> org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>processor<span class="token punctuation">.</span>TopologyBuilder<span class="token punctuation">;</span>

<span class="token keyword">import</span> java<span class="token punctuation">.</span>util<span class="token punctuation">.</span>Properties<span class="token punctuation">;</span>

<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">MyStream</span> <span class="token punctuation">{</span>
    <span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span>String<span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token keyword">final</span> String sourceTopic <span class="token operator">=</span> <span class="token string">&quot;first&quot;</span><span class="token punctuation">;</span>
        <span class="token keyword">final</span> String sinkTopic <span class="token operator">=</span> <span class="token string">&quot;second&quot;</span><span class="token punctuation">;</span>
        <span class="token comment" spellcheck="true">// 1. &#x8BBE;&#x7F6E;&#x53C2;&#x6570;</span>
        Properties props <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">Properties</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span>StreamsConfig<span class="token punctuation">.</span>APPLICATION_ID_CONFIG<span class="token punctuation">,</span> <span class="token string">&quot;mystreams&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span>StreamsConfig<span class="token punctuation">.</span>BOOTSTRAP_SERVERS_CONFIG<span class="token punctuation">,</span> <span class="token string">&quot;hadoop201:9092&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment" spellcheck="true">// 2. &#x521B;&#x5EFA;&#x62D3;&#x6251;&#x7ED3;&#x6784;builder&#x5BF9;&#x8C61;</span>
        TopologyBuilder builder <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">TopologyBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
                <span class="token punctuation">.</span><span class="token function">addSource</span><span class="token punctuation">(</span><span class="token string">&quot;source&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;first&quot;</span><span class="token punctuation">)</span>
                <span class="token punctuation">.</span><span class="token function">addProcessor</span><span class="token punctuation">(</span><span class="token string">&quot;processor&quot;</span><span class="token punctuation">,</span> <span class="token keyword">new</span> <span class="token class-name">MySupplier</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token string">&quot;source&quot;</span><span class="token punctuation">)</span>
                <span class="token punctuation">.</span><span class="token function">addSink</span><span class="token punctuation">(</span><span class="token string">&quot;sink&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;second&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;processor&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>

        KafkaStreams streams <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">KafkaStreams</span><span class="token punctuation">(</span>builder<span class="token punctuation">,</span> props<span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment" spellcheck="true">// 3. &#x542F;&#x52A8; kafka steams</span>
        streams<span class="token punctuation">.</span><span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre>
<h4 id="&#x5177;&#x4F53;&#x4E1A;&#x5904;&#x7406;">&#x5177;&#x4F53;&#x4E1A;&#x5904;&#x7406;</h4>
<pre class="language-"><code class="lang-java"><span class="token keyword">package</span> com<span class="token punctuation">.</span>atguigu<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>stream<span class="token punctuation">;</span>

<span class="token keyword">import</span> org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>processor<span class="token punctuation">.</span>Processor<span class="token punctuation">;</span>
<span class="token keyword">import</span> org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>processor<span class="token punctuation">.</span>ProcessorContext<span class="token punctuation">;</span>
<span class="token keyword">import</span> org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>processor<span class="token punctuation">.</span>ProcessorSupplier<span class="token punctuation">;</span>

<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">MySupplier</span> <span class="token keyword">implements</span> <span class="token class-name">ProcessorSupplier</span><span class="token operator">&lt;</span><span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> Processor<span class="token operator">&lt;</span><span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token operator">&gt;</span> <span class="token function">get</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token keyword">return</span> <span class="token keyword">new</span> <span class="token class-name">MyProcessor</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token punctuation">}</span>

    <span class="token keyword">private</span> <span class="token keyword">class</span> <span class="token class-name">MyProcessor</span> <span class="token keyword">implements</span> <span class="token class-name">Processor</span><span class="token operator">&lt;</span><span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>

        <span class="token keyword">private</span> ProcessorContext context<span class="token punctuation">;</span>

        <span class="token annotation punctuation">@Override</span>
        <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">init</span><span class="token punctuation">(</span>ProcessorContext context<span class="token punctuation">)</span> <span class="token punctuation">{</span>
            <span class="token keyword">this</span><span class="token punctuation">.</span>context <span class="token operator">=</span> context<span class="token punctuation">;</span>
        <span class="token punctuation">}</span>

        <span class="token annotation punctuation">@Override</span>
        <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span> key<span class="token punctuation">,</span> <span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span> value<span class="token punctuation">)</span> <span class="token punctuation">{</span>
            <span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span> newValue <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">String</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">replace</span><span class="token punctuation">(</span><span class="token string">&quot;&gt;&gt;&gt;&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">getBytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
            context<span class="token punctuation">.</span><span class="token function">forward</span><span class="token punctuation">(</span>key<span class="token punctuation">,</span> newValue<span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token punctuation">}</span>

        <span class="token annotation punctuation">@Override</span>
        <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">punctuate</span><span class="token punctuation">(</span><span class="token keyword">long</span> l<span class="token punctuation">)</span> <span class="token punctuation">{</span>

        <span class="token punctuation">}</span>

        <span class="token annotation punctuation">@Override</span>
        <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>

        <span class="token punctuation">}</span>
    <span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre>
<h3 id="3-&#x6D4B;&#x8BD5;">3. &#x6D4B;&#x8BD5;</h3>
<h4 id="a&#x8FD0;&#x884C;&#x4E3B;&#x7A0B;&#x5E8F;">a:&#x8FD0;&#x884C;&#x4E3B;&#x7A0B;&#x5E8F;</h4>
<h4 id="b&#x5F00;&#x542F;&#x751F;&#x6210;&#x8005;&#x5411;-topic-first-&#x5199;&#x5165;&#x6570;&#x636E;">b:&#x5F00;&#x542F;&#x751F;&#x6210;&#x8005;&#x5411; topic first &#x5199;&#x5165;&#x6570;&#x636E;</h4>
<pre class="language-"><code class="lang-bash">kafka-console-producer.sh --broker-list hadoop201:9092 --topic first
</code></pre>
<h4 id="c-&#x5F00;&#x542F;&#x6D88;&#x8D39;&#x8005;&#x4ECE;-topic-second-&#x6D88;&#x8D39;&#x6570;&#x636E;">c: &#x5F00;&#x542F;&#x6D88;&#x8D39;&#x8005;&#x4ECE; topic second &#x6D88;&#x8D39;&#x6570;&#x636E;</h4>
<pre class="language-"><code class="lang-bash">kafka-console-consumer.sh --zookeeper hadoop201:2181  --topic second
</code></pre>
<h4 id="d-&#x5F00;&#x59CB;&#x5199;&#x5165;&#x6570;&#x636E;-&#x5E76;&#x89C2;&#x5BDF;&#x6D88;&#x8D39;&#x6D88;&#x8D39;&#x7684;&#x6570;&#x636E;">d: &#x5F00;&#x59CB;&#x5199;&#x5165;&#x6570;&#x636E;, &#x5E76;&#x89C2;&#x5BDF;&#x6D88;&#x8D39;&#x6D88;&#x8D39;&#x7684;&#x6570;&#x636E;</h4>
<p>&#x5199;&#x5165;&#x7684;&#x6570;&#x636E;:
<img src="http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/1541925036.png-atguiguText" alt=""></p>
<p>&#x6D88;&#x8D39;&#x5230;&#x7684;&#x6570;&#x636E;:
<img src="http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/1541925064.png-atguiguText" alt=""></p>
<footer class="page-footer"><span class="copyright">Copyright &#xA9; &#x5C1A;&#x7845;&#x8C37;&#x5927;&#x6570;&#x636E; 2019 all right reserved&#xFF0C;powered by Gitbook</span><span class="footer-modification"><br>&#x8BE5;&#x6587;&#x4EF6;&#x6700;&#x540E;&#x4FEE;&#x8BA2;&#x65F6;&#x95F4;&#xFF1A;
2018-12-20 21:42:38
</span></footer>
                                
                                </section>
                            
    </div>
    <div class="search-results">
        <div class="has-results">
            
            <h1 class="search-results-title"><span class='search-results-count'></span> results matching "<span class='search-query'></span>"</h1>
            <ul class="search-results-list"></ul>
            
        </div>
        <div class="no-results">
            
            <h1 class="search-results-title">No results matching "<span class='search-query'></span>"</h1>
            
        </div>
    </div>
</div>

                        </div>
                    </div>
                
            </div>

            
                
                <a href="61-gai-shu.html" class="navigation navigation-prev " aria-label="Previous page: 6.1 概述">
                    <i class="fa fa-angle-left"></i>
                </a>
                
                
                <a href="../di-7-zhang-kuo-zhan.html" class="navigation navigation-next " aria-label="Next page: 第 7 章 扩展">
                    <i class="fa fa-angle-right"></i>
                </a>
                
            
        
    </div>

    <script>
        var gitbook = gitbook || [];
        gitbook.push(function() {
            gitbook.page.hasChanged({"page":{"title":"6.2 Kafka Stream 数据清洗案例","level":"1.6.2","depth":2,"next":{"title":"第 7 章 扩展","level":"1.7","depth":1,"path":"di-7-zhang-kuo-zhan.md","ref":"di-7-zhang-kuo-zhan.md","articles":[{"title":"7.1 Kafka 和 Flume 比较","level":"1.7.1","depth":2,"path":"di-7-zhang-kuo-zhan/71-kafka-he-flume-ji-cheng.md","ref":"di-7-zhang-kuo-zhan/71-kafka-he-flume-ji-cheng.md","articles":[]},{"title":"7.2 Kafka 和 Flume 集成","level":"1.7.2","depth":2,"path":"di-7-zhang-kuo-zhan/72-kafka-he-flume-ji-cheng.md","ref":"di-7-zhang-kuo-zhan/72-kafka-he-flume-ji-cheng.md","articles":[]},{"title":"7.3 Kafka 配置信息","level":"1.7.3","depth":2,"path":"di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi.md","ref":"di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi.md","articles":[{"title":"7.3.1 broker 配置信息","level":"1.7.3.1","depth":3,"path":"di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi/731-broker-pei-zhi-xin-xi.md","ref":"di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi/731-broker-pei-zhi-xin-xi.md","articles":[]},{"title":"7.3.2 Producer 配置信息","level":"1.7.3.2","depth":3,"path":"di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi/732-producer-pei-zhi-xin-xi.md","ref":"di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi/732-producer-pei-zhi-xin-xi.md","articles":[]},{"title":"7.3.3 Consumer 配置信息","level":"1.7.3.3","depth":3,"path":"di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi/733-consumer-pei-zhi-xin-xi.md","ref":"di-7-zhang-kuo-zhan/73-kafka-pei-zhi-xin-xi/733-consumer-pei-zhi-xin-xi.md","articles":[]}]}]},"previous":{"title":"6.1 概述","level":"1.6.1","depth":2,"path":"di-6-zhang-kafka-stream/61-gai-shu.md","ref":"di-6-zhang-kafka-stream/61-gai-shu.md","articles":[]},"dir":"ltr"},"config":{"plugins":["splitter","expandable-chapters-small","page-toc-button","-highlight","prism","prism-themes","tbfed-pagefooter","sitemap","change_girls","livereload"],"styles":{"website":"styles/website.css","ebook":"styles/ebook.css","pdf":"styles/pdf.css","mobi":"styles/mobi.css","epub":"styles/epub.css"},"pluginsConfig":{"tbfed-pagefooter":{"copyright":"Copyright &copy 尚硅谷大数据 2019","modify_label":"<br>该文件最后修订时间：","modify_format":"YYYY-MM-DD HH:mm:ss"},"prism":{"css":["prismjs/themes/prism-solarizedlight.css"]},"disqus":{"shortName":"zhenchao125"},"livereload":{},"splitter":{},"change_girls":{"time":30,"urls":["http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/imgs/18-3-4/62171757.jpg-atguiguImg","http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/imgs/18-3-4/83394736.jpg-atguiguImg","http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/imgs/18-3-4/15906185.jpg-atguiguImg","http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/imgs/18-3-4/15906185.jpg-atguiguImg","http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/imgs/18-3-4/3394270.jpg-atguiguImg","http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/imgs/18-3-4/65028582.jpg-atguiguImg","http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/imgs/18-3-4/20359261.jpg-atguiguImg","http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/imgs/18-3-4/37472401.jpg-atguiguImg","http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/imgs/18-3-4/3355397.jpg-atguiguImg","http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/imgs/18-3-4/11075549.jpg-atguiguImg"]},"search":{},"lunr":{"maxIndexSize":1000000,"ignoreSpecialCharacters":false},"fontsettings":{"theme":"white","family":"sans","size":2},"sitemap":{"hostname":"https://www.gitbook.com/@zhenchao125"},"page-toc-button":{"maxTocDepth":3,"minTocSize":2},"prism-themes":{},"expandable-chapters-small":{},"sharing":{"facebook":true,"twitter":true,"google":false,"weibo":false,"instapaper":false,"vk":false,"all":["facebook","google","twitter","weibo","instapaper"]},"theme-default":{"styles":{"website":"styles/website.css","pdf":"styles/pdf.css","epub":"styles/epub.css","mobi":"styles/mobi.css","ebook":"styles/ebook.css","print":"styles/print.css"},"showLevel":false}},"theme":"default","author":" 李振超","pdf":{"pageNumbers":true,"fontSize":12,"fontFamily":"Arial","paperSize":"a4","chapterMark":"pagebreak","pageBreaksBefore":"/","margin":{"right":62,"left":62,"top":56,"bottom":56}},"structure":{"langs":"LANGS.md","readme":"README.md","glossary":"GLOSSARY.md","summary":"SUMMARY.md"},"variables":{},"title":"大数据 Fulume 课堂笔记","language":"zh-hans","links":{"sidebar":{"联系我们":"http://www.atguigu.com/"}},"gitbook":"*","description":"课堂笔记,学生提前预习"},"file":{"path":"di-6-zhang-kafka-stream/62-kafka-stream-shu-ju-qing-xi-an-li.md","mtime":"2018-12-20T13:42:38.286Z","type":"markdown"},"gitbook":{"version":"3.2.3","time":"2019-01-25T01:42:15.294Z"},"basePath":"..","book":{"language":""}});
        });
    </script>
</div>

        
    <script src="../gitbook/gitbook.js"></script>
    <script src="../gitbook/theme.js"></script>
    
        
        <script src="../gitbook/gitbook-plugin-splitter/splitter.js"></script>
        
    
        
        <script src="../gitbook/gitbook-plugin-expandable-chapters-small/expandable-chapters-small.js"></script>
        
    
        
        <script src="../gitbook/gitbook-plugin-page-toc-button/plugin.js"></script>
        
    
        
        <script src="../gitbook/gitbook-plugin-change_girls/girls.js"></script>
        
    
        
        <script src="../gitbook/gitbook-plugin-livereload/plugin.js"></script>
        
    
        
        <script src="../gitbook/gitbook-plugin-search/search-engine.js"></script>
        
    
        
        <script src="../gitbook/gitbook-plugin-search/search.js"></script>
        
    
        
        <script src="../gitbook/gitbook-plugin-lunr/lunr.min.js"></script>
        
    
        
        <script src="../gitbook/gitbook-plugin-lunr/search-lunr.js"></script>
        
    
        
        <script src="../gitbook/gitbook-plugin-sharing/buttons.js"></script>
        
    
        
        <script src="../gitbook/gitbook-plugin-fontsettings/fontsettings.js"></script>
        
    

    </body>
</html>

